/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.testing;



import com.google.auto.value.AutoValue;

import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.PipelineRunner;

import com.bff.gaia.unified.sdk.annotations.Internal;

import com.bff.gaia.unified.sdk.coders.*;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.SchemaCoder;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.DurationCoder;

import com.bff.gaia.unified.sdk.coders.InstantCoder;

import com.bff.gaia.unified.sdk.coders.StructuredCoder;

import com.bff.gaia.unified.sdk.coders.VarIntCoder;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TimestampedValue;

import com.bff.gaia.unified.sdk.values.WindowingStrategy;

import org.joda.time.Duration;

import org.joda.time.Instant;



import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.Objects;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * A testing input that generates an unbounded {@link PCollection} of elements, advancing the

 * watermark and processing time as elements are emitted. After all of the specified elements are

 * emitted, ceases to produce output.

 *

 * <p>Each call to a {@link TestStream.Builder} method will only be reflected in the state of the

 * {@link Pipeline} after each method before it has completed and no more progress can be made by

 * the {@link Pipeline}. A {@link PipelineRunner} must ensure that no more progress can be made in

 * the {@link Pipeline} before advancing the state of the {@link TestStream}.

 */

public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {

  private final List<Event<T>> events;

  private final Coder<T> coder;



  /**

   * Create a new {@link TestStream.Builder} with no elements and watermark equal to {@link

   * BoundedWindow#TIMESTAMP_MIN_VALUE}.

   */

  public static <T> Builder<T> create(Coder<T> coder) {

    return new Builder<>(coder);

  }



  public static <T> Builder<T> create(

      Schema schema,

      SerializableFunction<T, Row> toRowFunction,

      SerializableFunction<Row, T> fromRowFunction) {

    return create(SchemaCoder.of(schema, toRowFunction, fromRowFunction));

  }



  private TestStream(Coder<T> coder, List<Event<T>> events) {

    this.coder = coder;

    this.events = checkNotNull(events);

  }



  /**

   * An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence

   * when the pipeline created by the {@link TestStream} is run.

   */

  public static class Builder<T> {

    private final Coder<T> coder;

    private final ImmutableList<Event<T>> events;

    private final Instant currentWatermark;



    private Builder(Coder<T> coder) {

      this(coder, ImmutableList.of(), BoundedWindow.TIMESTAMP_MIN_VALUE);

    }



    private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {

      this.coder = coder;

      this.events = events;

      this.currentWatermark = currentWatermark;

    }



    /**

     * Adds the specified elements to the source with timestamp equal to the current watermark.

     *

     * @return A {@link TestStream.Builder} like this one that will add the provided elements after

     *     all earlier events have completed.

     */

    @SafeVarargs

    public final Builder<T> addElements(T element, T... elements) {

      TimestampedValue<T> firstElement = TimestampedValue.of(element, currentWatermark);

      @SuppressWarnings({"unchecked", "rawtypes"})

      TimestampedValue<T>[] remainingElements = new TimestampedValue[elements.length];

      for (int i = 0; i < elements.length; i++) {

        remainingElements[i] = TimestampedValue.of(elements[i], currentWatermark);

      }

      return addElements(firstElement, remainingElements);

    }



    /**

     * Adds the specified elements to the source with the provided timestamps.

     *

     * @return A {@link TestStream.Builder} like this one that will add the provided elements after

     *     all earlier events have completed.

     */

    @SafeVarargs

    public final Builder<T> addElements(

		TimestampedValue<T> element, TimestampedValue<T>... elements) {

      checkArgument(

          element.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),

          "Elements must have timestamps before %s. Got: %s",

          BoundedWindow.TIMESTAMP_MAX_VALUE,

          element.getTimestamp());

      for (TimestampedValue<T> multiElement : elements) {

        checkArgument(

            multiElement.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),

            "Elements must have timestamps before %s. Got: %s",

            BoundedWindow.TIMESTAMP_MAX_VALUE,

            multiElement.getTimestamp());

      }

      ImmutableList<Event<T>> newEvents =

          ImmutableList.<Event<T>>builder()

              .addAll(events)

              .add(ElementEvent.add(element, elements))

              .build();

      return new Builder<>(coder, newEvents, currentWatermark);

    }



    /**

     * Advance the watermark of this source to the specified instant.

     *

     * <p>The watermark must advance monotonically and cannot advance to {@link

     * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.

     *

     * @return A {@link TestStream.Builder} like this one that will advance the watermark to the

     *     specified point after all earlier events have completed.

     */

    public Builder<T> advanceWatermarkTo(Instant newWatermark) {

      checkArgument(

          !newWatermark.isBefore(currentWatermark), "The watermark must monotonically advance");

      checkArgument(

          newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),

          "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",

          newWatermark,

          BoundedWindow.TIMESTAMP_MAX_VALUE);

      ImmutableList<Event<T>> newEvents =

          ImmutableList.<Event<T>>builder()

              .addAll(events)

              .add(WatermarkEvent.advanceTo(newWatermark))

              .build();

      return new Builder<>(coder, newEvents, newWatermark);

    }



    /**

     * Advance the processing time by the specified amount.

     *

     * @return A {@link TestStream.Builder} like this one that will advance the processing time by

     *     the specified amount after all earlier events have completed.

     */

    public Builder<T> advanceProcessingTime(Duration amount) {

      checkArgument(

          amount.getMillis() > 0,

          "Must advance the processing time by a positive amount. Got: ",

          amount);

      ImmutableList<Event<T>> newEvents =

          ImmutableList.<Event<T>>builder()

              .addAll(events)

              .add(ProcessingTimeEvent.advanceBy(amount))

              .build();

      return new Builder<>(coder, newEvents, currentWatermark);

    }



    /**

     * Advance the watermark to infinity, completing this {@link TestStream}. Future calls to the

     * same builder will not affect the returned {@link TestStream}.

     */

    public TestStream<T> advanceWatermarkToInfinity() {

      ImmutableList<Event<T>> newEvents =

          ImmutableList.<Event<T>>builder()

              .addAll(events)

              .add(WatermarkEvent.advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))

              .build();

      return new TestStream<>(coder, newEvents);

    }

  }



  /**

   * An event in a {@link TestStream}. A marker interface for all events that happen while

   * evaluating a {@link TestStream}.

   */

  public interface Event<T> {

    EventType getType();

  }



  /** The types of {@link Event} that are supported by {@link TestStream}. */

  public enum EventType {

    ELEMENT,

    WATERMARK,

    PROCESSING_TIME

  }



  /** A {@link Event} that produces elements. */

  @AutoValue

  public abstract static class ElementEvent<T> implements Event<T> {

    public abstract Iterable<TimestampedValue<T>> getElements();



    @SafeVarargs

    static <T> Event<T> add(TimestampedValue<T> element, TimestampedValue<T>... elements) {

      return add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build());

    }



    /** <b>For internal use only: no backwards compatibility guarantees.</b> */

    @Internal

    public static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {

      return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, elements);

    }

  }



  /** A {@link Event} that advances the watermark. */

  @AutoValue

  public abstract static class WatermarkEvent<T> implements Event<T> {

    public abstract Instant getWatermark();



    /** <b>For internal use only: no backwards compatibility guarantees.</b> */

    @Internal

    public static <T> Event<T> advanceTo(Instant newWatermark) {

      return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, newWatermark);

    }

  }



  /** A {@link Event} that advances the processing time clock. */

  @AutoValue

  public abstract static class ProcessingTimeEvent<T> implements Event<T> {

    public abstract Duration getProcessingTimeAdvance();



    /** <b>For internal use only: no backwards compatibility guarantees.</b> */

    @Internal

    public static <T> Event<T> advanceBy(Duration amount) {

      return new AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount);

    }

  }



  @Override

  public PCollection<T> expand(PBegin input) {

    return PCollection.createPrimitiveOutputInternal(

        input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED, coder);

  }



  public Coder<T> getValueCoder() {

    return coder;

  }



  /**

   * Returns the sequence of {@link Event Events} in this {@link TestStream}.

   *

   * <p>For use by {@link PipelineRunner} authors.

   */

  public List<Event<T>> getEvents() {

    return events;

  }



  /**

   * <b>For internal use only. No backwards-compatibility guarantees.</b>

   *

   * <p>Builder a test stream directly from events. No validation is performed on watermark

   * monotonicity, etc. This is assumed to be a previously-serialized {@link TestStream} transform

   * that is correct by construction.

   */

  @Internal

  public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {

    return new TestStream<>(coder, events);

  }



  @Override

  public boolean equals(Object other) {

    if (!(other instanceof TestStream)) {

      return false;

    }

    TestStream<?> that = (TestStream<?>) other;



    return getValueCoder().equals(that.getValueCoder()) && getEvents().equals(that.getEvents());

  }



  @Override

  public int hashCode() {

    return Objects.hash(TestStream.class, getValueCoder(), getEvents());

  }



  /** Coder for {@link TestStream}. */

  public static class TestStreamCoder<T> extends StructuredCoder<TestStream<T>> {



    private final TimestampedValue.TimestampedValueCoder<T> elementCoder;



    public static <T> TestStreamCoder<T> of(Coder<T> valueCoder) {

      return new TestStreamCoder<>(valueCoder);

    }



    private TestStreamCoder(Coder<T> valueCoder) {

      this.elementCoder = TimestampedValue.TimestampedValueCoder.of(valueCoder);

    }



    @Override

    public void encode(TestStream<T> value, OutputStream outStream) throws IOException {

      List<Event<T>> events = value.getEvents();

      VarIntCoder.of().encode(events.size(), outStream);



      for (Event event : events) {

        if (event instanceof ElementEvent) {

          outStream.write(event.getType().ordinal());

          Iterable<TimestampedValue<T>> elements = ((ElementEvent) event).getElements();

          VarIntCoder.of().encode(Iterables.size(elements), outStream);

          for (TimestampedValue<T> element : elements) {

            elementCoder.encode(element, outStream);

          }

        } else if (event instanceof WatermarkEvent) {

          outStream.write(event.getType().ordinal());

          Instant watermark = ((WatermarkEvent) event).getWatermark();

          InstantCoder.of().encode(watermark, outStream);

        } else if (event instanceof ProcessingTimeEvent) {

          outStream.write(event.getType().ordinal());

          Duration processingTimeAdvance = ((ProcessingTimeEvent) event).getProcessingTimeAdvance();

          DurationCoder.of().encode(processingTimeAdvance, outStream);

        }

      }

    }



    @Override

    public TestStream<T> decode(InputStream inStream) throws IOException {

      Integer numberOfEvents = VarIntCoder.of().decode(inStream);

      List<Event<T>> events = new ArrayList<>(numberOfEvents);



      for (int i = 0; i < numberOfEvents; i++) {

        EventType eventType = EventType.values()[inStream.read()];

        switch (eventType) {

          case ELEMENT:

            int numElements = VarIntCoder.of().decode(inStream);

            List<TimestampedValue<T>> elements = new ArrayList<>(numElements);

            for (int j = 0; j < numElements; j++) {

              elements.add(elementCoder.decode(inStream));

            }

            events.add(ElementEvent.add(elements));

            break;

          case WATERMARK:

            Instant watermark = InstantCoder.of().decode(inStream);

            events.add(WatermarkEvent.advanceTo(watermark));

            break;

          case PROCESSING_TIME:

            Duration duration = DurationCoder.of().decode(inStream).toDuration();

            events.add(ProcessingTimeEvent.advanceBy(duration));

            break;

          default:

            throw new IllegalStateException("Unknown event type + " + eventType);

        }

      }

      return TestStream.fromRawEvents(elementCoder.getValueCoder(), events);

    }



    @Override

    public List<? extends Coder<?>> getCoderArguments() {

      return Collections.singletonList(elementCoder);

    }



    @Override

    public void verifyDeterministic() throws NonDeterministicException {}

  }

}